package org.databandtech.job.jobs;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.util.OptionsFileUtil;
import org.databandtech.job.entity.HdfsToJdbcS1;
import org.databandtech.job.entity.ScheduledTaskJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.sqoop.tool.SqoopTool;

public class HdfsToJdbcSqoop1Job implements ScheduledTaskJob{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(HdfsToJdbcSqoop1Job.class);

	public static void main(String[] args) {
		HdfsToJdbcSqoop1Job job =new HdfsToJdbcSqoop1Job();
		job.run();
	}

	private static List<HdfsToJdbcS1> getInstances() {
		//一个job可以包含多个实例，,通过数据库或者配置文件或者java代码引入多个实例
		List<HdfsToJdbcS1> s1List = new ArrayList<HdfsToJdbcS1>();
		
		HdfsToJdbcS1 s1 = new HdfsToJdbcS1();
		s1.setDburl("jdbc:mysql://localhost:3306/databand?useSSL=false&useUnicode=true&characterEncoding=utf-8");
		s1.setDriver("com.mysql.jdbc.Driver");
		s1.setDbuser("root");
		s1.setDbpassword("mysql");
		s1.setTotable("b_video");
		s1.setExportdir("/user/java_import_mockinstances1");
		s1.setM("2");
		s1.setFieldsterminated(",");
		s1.setLinesterminated("\n");
		s1.setDefaultfs("hdfs://hadoop001:8020");
		s1List.add(s1);
		
		return s1List;
	}

	private static int execJob(HdfsToJdbcS1 s1) throws Exception {
        String[] args = new String[] {
                "--connect",s1.getDburl(),
                "--driver",s1.getDriver(),
                "-username",s1.getDbuser(),
                "-password",s1.getDbpassword(),
                "--table",s1.getTotable(),
                "-m",s1.getM(),
                "--export-dir",s1.getExportdir(),
                "--input-fields-terminated-by",s1.getFieldsterminated(),
                "--input-lines-terminated-by",s1.getLinesterminated()
        };

        String[] expandArguments = OptionsFileUtil.expandArguments(args);

        SqoopTool tool = SqoopTool.getTool("export");

        Configuration conf = new Configuration();
        //单机
        //conf.set("fs.default.name", "hdfs://hadoop001:8020");
        //集群
        conf.set("fs.defaultFS",s1.getDefaultfs() );
        
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);

        Sqoop sqoop = new Sqoop(tool, loadPlugins);
        return Sqoop.runSqoop(sqoop, expandArguments);
    }

	@Override
	public void run() {
		LOGGER.info("ScheduledTask => {}  run  当前线程名称 {} ",this.getClass().getSimpleName(), Thread.currentThread().getName());
		try {
			//载入实例,通过数据库或者配置文件或者java代码
			List<HdfsToJdbcS1> s1List = getInstances();
			
			for (HdfsToJdbcS1 instance : s1List) {
				execJob(instance);
			}
			
			
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
